package com.xhy.documents_collection.scheduler;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.xhy.documents_collection.entity.PO.task.TaskTimingRecord;
import com.xhy.documents_collection.enums.TimingEnum;
import com.xhy.documents_collection.holder.UserHolder;
import com.xhy.documents_collection.service.async.AsyncService;
import com.xhy.documents_collection.service.task.TaskTimingRecordService;
import com.xhy.documents_collection.utils.SpringUtil;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;

/**
 * Author: Xhy
 * CreateTime: 2023-03-17 10:23
 * 延迟队列定时器
 */
public class DelayQueueScheduler implements Scheduler<TaskTimingRecord>{


    private BlockingQueue<TaskTimingRecord> queue;

    private static DelayQueueScheduler delayQueueScheduler;


    TaskTimingRecordService taskTimingRecordService;
    private DelayQueueScheduler(){

    }
  private DelayQueueScheduler(TaskTimingRecordService taskTimingRecordService){
        this.taskTimingRecordService = taskTimingRecordService;
        init();
  }

    public static synchronized DelayQueueScheduler getInstance(TaskTimingRecordService taskTimingRecordService){

        if (delayQueueScheduler == null){
            delayQueueScheduler = new DelayQueueScheduler(taskTimingRecordService);
        }
        return delayQueueScheduler;
    }
    /**
     * 容灾恢复以及处理定时任务
     */
    private void init(){
        Thread t = new Thread(()->{
            queue = new DelayQueue<>();
            try {
                // 需要等待其他bean初始化
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 容灾初始化
            List<TaskTimingRecord> list = taskTimingRecordService.list(new LambdaQueryWrapper<TaskTimingRecord>()
                    .ne(TaskTimingRecord::getState, TimingEnum.SUCCESS.type));
            for (TaskTimingRecord timingRecord : list) {
                timingRecord.setExpire();
                queue.add(timingRecord);
            }
            while (true){
                try {
                    TaskTimingRecord taskTimingRecord = queue.take();
                    SpringUtil.getBean(AsyncService.class).addSchedulerTask(taskTimingRecord);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t.setDaemon(true);
        t.setName("delay");
        t.start();


    }


    @Override
    public void add(TaskTimingRecord taskTimingRecord) {
        queue.add(taskTimingRecord);

    }

    @Override
    public void update(TaskTimingRecord taskTimingRecord) {
        if (!queue.isEmpty()){
            TaskTimingRecord t = taskTimingRecordService.getById(taskTimingRecord.getId());
            taskTimingRecord.setTaskName(t.getTaskName());
            taskTimingRecord.setUserId(UserHolder.get());
            queue.remove(taskTimingRecord);
            queue.add(taskTimingRecord);
        }
    }

    @Override
    public void delete(Collection<Integer> ids) {
        for (Integer id : ids) {
            TaskTimingRecord taskTimingRecord = new TaskTimingRecord(id);
            queue.remove(taskTimingRecord);
        }
    }


}
